package day02.transform;

import beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Flink 流处理 API - transform
 * <p>
 * 抽离代码
 *
 * @author lvbingbing
 * @date 2021-12-14 21:26
 */
public class FlinkTransform00 {

    /**
     * 执行环境
     */
    private StreamExecutionEnvironment env;

    /**
     * 从 input/sensor.txt 读取到的数据
     */
    private DataStream<String> inputStream;

    /**
     * 有参构造器会初始化成员变量
     *
     * @param parallelism 并行度
     */
    public FlinkTransform00(int parallelism) {
        init(parallelism);
    }

    public FlinkTransform00() {

    }

    /**
     * 初始化
     *
     * @param parallelism 并行度
     */
    public void init(int parallelism) {
        // 1、创建一个表示当前执行程序的上下文的执行环境。 如果程序是独立调用的，则此方法返回一个本地执行环境
        env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        // 2、从文件读取数据
        inputStream = env.readTextFile("input/sensor.txt");
    }

    /**
     * 获取可执行环境
     *
     * @return 可执行环境
     */
    public StreamExecutionEnvironment getEnv() {
        return env;
    }

    /**
     * 获取 从 input/sensor.txt 读取到的数据
     *
     * @return <br>
     */
    public DataStream<String> getInputStream() {
        return inputStream;
    }

    /**
     * 获取SensorReading类型的数据流
     *
     * @return <br>
     */
    public DataStream<SensorReading> getSensorReadingStream() {
        return inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
    }
}
